package com.casic.ccp.flying.youth.timeseriesdata.websocket;

import com.casic.ccp.flying.youth.timeseriesdata.model.SocketClient;
import com.casic.ccp.flying.youth.timeseriesdata.service.WebSocketService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicReference;

/**
 * @Author: 贺坤
 * @Date: 2021/9/23 18:40
 */
@Component
@WebSocketMapping("/websocket/file/importData")
public class EchoHandler implements WebSocketHandler {
    /**
     * 日志记录器
     */
    private static Logger LOGGER = LoggerFactory.getLogger(EchoHandler.class);

    @Autowired
    private WebSocketService webSocketService;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String id = session.getId();
        AtomicReference<SocketClient> socketClient = new AtomicReference<>();
        //  出站
        Flux<WebSocketMessage> messages = Flux.create(sink -> socketClient.set(new SocketClient(sink, session)));
        Mono<Void> output = session.send(messages);
        // 入站
        Mono<Void> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
//                .map(msg -> msg)
                .doOnNext(msg -> {
                    webSocketService.handle(msg, socketClient.get());
                }).then();
        // 合并
        return Mono.zip(input, output).then();
    }

}